#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Methods to call into SparkRBackend.


# Returns TRUE if object is an instance of given class
isInstanceOf <- function(jobj, className) {
  stopifnot(class(jobj) == "jobj")
  cls <- callJStatic("java.lang.Class", "forName", className)
  callJMethod(cls, "isInstance", jobj)
}

# Call a Java method named methodName on the object
# specified by objId. objId should be a "jobj" returned
# from the SparkRBackend.
callJMethod <- function(objId, methodName, ...) {
  stopifnot(class(objId) == "jobj")
  if (!isValidJobj(objId)) {
    stop("Invalid jobj ", objId$id,
         ". If SparkR was restarted, Spark operations need to be re-executed.")
  }
  invokeJava(isStatic = FALSE, objId$id, methodName, ...)
}

# Call a static method on a specified className
callJStatic <- function(className, methodName, ...) {
  invokeJava(isStatic = TRUE, className, methodName, ...)
}

# Create a new object of the specified class name
newJObject <- function(className, ...) {
  invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
}

# Remove an object from the SparkR backend. This is done
# automatically when a jobj is garbage collected.
removeJObject <- function(objId) {
  invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
}

isRemoveMethod <- function(isStatic, objId, methodName) {
  isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
}

# Invoke a Java method on the SparkR backend. Users
# should typically use one of the higher level methods like
# callJMethod, callJStatic etc. instead of using this.
#
# isStatic - TRUE if the method to be called is static
# objId - String that refers to the object on which method is invoked
#         Should be a jobj id for non-static methods and the classname
#         for static methods
# methodName - name of method to be invoked
invokeJava <- function(isStatic, objId, methodName, ...) {
  if (!exists(".sparkRCon", .sparkREnv)) {
    stop("No connection to backend found. Please re-run sparkR.init")
  }

  # If this isn't a removeJObject call
  if (!isRemoveMethod(isStatic, objId, methodName)) {
    objsToRemove <- ls(.toRemoveJobjs)
    if (length(objsToRemove) > 0) {
      sapply(objsToRemove,
            function(e) {
              removeJObject(e)
            })
      rm(list = objsToRemove, envir = .toRemoveJobjs)
    }
  }


  rc <- rawConnection(raw(0), "r+")

  writeBoolean(rc, isStatic)
  writeString(rc, objId)
  writeString(rc, methodName)

  args <- list(...)
  writeInt(rc, length(args))
  writeArgs(rc, args)

  # Construct the whole request message to send it once,
  # avoiding write-write-read pattern in case of Nagle's algorithm.
  # Refer to http://en.wikipedia.org/wiki/Nagle%27s_algorithm for the details.
  bytesToSend <- rawConnectionValue(rc)
  close(rc)
  rc <- rawConnection(raw(0), "r+")
  writeInt(rc, length(bytesToSend))
  writeBin(bytesToSend, rc)
  requestMessage <- rawConnectionValue(rc)
  close(rc)

  conn <- get(".sparkRCon", .sparkREnv)
  writeBin(requestMessage, conn)

  # TODO: check the status code to output error information
  returnStatus <- readInt(conn)
  if (returnStatus != 0) {
    stop(readString(conn))
  }
  readObject(conn)
}
